"""
Case Type   : 发布订阅
Case Name   : 开启发布订阅后,订阅端主备切换
Create At   : 2023-01-10
Owner       : opentestcase004
Description :
    1.在两个集群创建表
    2.创建发布订阅
    3.发布端插入数据
    4.订阅端查询数据是否更新
    5.订阅端主备切换
    6.发布端删除数据
    7.订阅端查询数据是否更新
    8.订阅端恢复主备关系
    9.发布端修改数据
    10.订阅端查询数据是否更新
    11.清理环境
Expect      :
    1.成功
    2.成功
    3.成功
    4.数据更新
    5.成功
    6.成功
    7.数据更新
    8.成功
    9.成功
    10.数据更新
    11.成功
History     :
    modified by opentestcase004 2023/01/13:修改step9断言
    Modified by opentestcase012 2023/4/21:优化发布端调用方法,tearndown中添加主备切换恢复
    Modified by opentestcase012 2023/5/5:优化用例,调整teardown中执行顺序,避免异常情况下环境未清理干净
"""

import os
import unittest
from yat.test import macro
from yat.test import Node
from testcase.utils.Common import Common
from testcase.utils.CommonSH import CommonSH
from testcase.utils.Constant import Constant
from testcase.utils.Logger import Logger

Primary_SH = CommonSH('PrimaryDbUser')


@unittest.skipIf(3 != Primary_SH.get_node_num(), '非1+2环境不执行')
class Pubsubclass(unittest.TestCase):
    def setUp(self):
        self.log = Logger()
        self.log.info("-----------this is setup-----------")
        self.log.info(f"-----{os.path.basename(__file__)} start-----")
        self.pri_userdb_pub = Node(node='PrimaryDbUser')
        self.pri_userdb_sub_sta = Node(node='remote1_Standby1DbUser')
        self.pri_userdb_sub = Node(node='remote1_PrimaryDbUser')
        self.constant = Constant()
        self.commsh_pub = CommonSH('PrimaryDbUser')
        self.commsh_sub_sta = CommonSH('remote1_Standby1DbUser')
        self.commsh_sub = CommonSH('remote1_PrimaryDbUser')
        self.com_pub = Common()
        self.com_sub = Common('remote1_PrimaryDbUser')
        self.tb_name1 = 't_pub_sub_0060_1'
        self.tb_name2 = 't_pub_sub_0060_2'
        self.subname = "s_pub_sub_0060"
        self.pubname = "p_pub_sub_0060"
        self.parent_path_pub = os.path.dirname(macro.DB_INSTANCE_PATH)
        self.parent_path_sub = os.path.dirname(macro.DB_INSTANCE_PATH_REMOTE1)
        self.port = str(int(self.pri_userdb_pub.db_port) + 1)
        self.wal_level = self.com_pub.show_param("wal_level")
        self.enable_slot_log = self.com_pub.show_param("enable_slot_log")
        self.user_param_pub = f'-U {self.pri_userdb_pub.db_user} ' \
            f'-W {self.pri_userdb_pub.db_password}'
        self.user_param_sub = f'-U {self.pri_userdb_sub.db_user} ' \
            f'-W {self.pri_userdb_sub.db_password}'
        cmd = f"cp " \
            f"{os.path.join(macro.DB_INSTANCE_PATH, 'pg_hba.conf')} " \
            f"{os.path.join(self.parent_path_pub, 'pg_hba.conf')};"
        self.log.info(cmd)
        result = self.pri_userdb_pub.sh(cmd).result()
        self.log.info(result)
        cmd = f"cp " \
            f"{os.path.join(macro.DB_INSTANCE_PATH_REMOTE1, 'pg_hba.conf')}" \
            f" {os.path.join(self.parent_path_sub, 'pg_hba.conf')};"
        self.log.info(cmd)
        result = self.pri_userdb_sub.sh(cmd).result()
        self.log.info(result)
        self.hostname = self.pri_userdb_sub.sh('hostname').result().strip()
        text = "--step:预置条件,修改pg_hba expect:成功"
        self.log.info(text)
        guc_res = self.commsh_pub.execute_gsguc(
            'reload', self.constant.GSGUC_SUCCESS_MSG, '',
            'all', False, False, '',
            f'host    replication  {self.pri_userdb_sub.db_user} '
            f'{self.pri_userdb_sub.db_host}/32 sha256')
        self.log.info(guc_res)
        self.assertTrue(guc_res, '执行失败' + text)
        guc_res = self.commsh_pub.execute_gsguc(
            'reload', self.constant.GSGUC_SUCCESS_MSG, '',
            'all', False, False, '',
            f'host    all  {self.pri_userdb_sub.db_user} '
            f'{self.pri_userdb_sub.db_host}/32 sha256')
        self.log.info(guc_res)
        self.assertTrue(guc_res, '执行失败' + text)
        guc_res = self.commsh_pub.execute_gsguc(
            'reload', self.constant.GSGUC_SUCCESS_MSG, '',
            'all', False, False, '',
            f'host    replication  {self.pri_userdb_sub_sta.db_user} '
            f'{self.pri_userdb_sub_sta.db_host}/32 sha256')
        self.log.info(guc_res)
        self.assertTrue(guc_res, '执行失败' + text)
        guc_res = self.commsh_pub.execute_gsguc(
            'reload', self.constant.GSGUC_SUCCESS_MSG, '',
            'all', False, False, '',
            f'host    all  {self.pri_userdb_sub_sta.db_user} '
            f'{self.pri_userdb_sub_sta.db_host}/32 sha256')
        self.log.info(guc_res)
        self.assertTrue(guc_res, '执行失败' + text)
        result = self.commsh_pub.execute_gsguc(
            'set', self.constant.GSGUC_SUCCESS_MSG, 'wal_level=logical')
        self.assertTrue(result, '执行失败' + text)
        result = self.commsh_pub.restart_db_cluster(True)
        flg = self.constant.START_SUCCESS_MSG in result or 'Degrade' in result
        self.assertTrue(flg, '执行失败' + text)
        guc_res = self.commsh_sub.execute_gsguc(
            'reload', self.constant.GSGUC_SUCCESS_MSG, '',
            'all', False, False, macro.DB_INSTANCE_PATH_REMOTE1,
            f'host    replication  {self.pri_userdb_pub.db_user} '
            f'{self.pri_userdb_pub.db_host}/32 sha256',
            macro.DB_ENV_PATH_REMOTE1)
        self.log.info(guc_res)
        self.assertTrue(guc_res, '执行失败' + text)
        guc_res = self.commsh_sub.execute_gsguc(
            'reload', self.constant.GSGUC_SUCCESS_MSG, '',
            'all', False, False, macro.DB_INSTANCE_PATH_REMOTE1,
            f'host all  {self.pri_userdb_pub.db_user} '
            f'{self.pri_userdb_pub.db_host}/32 sha256',
            macro.DB_ENV_PATH_REMOTE1)
        self.log.info(guc_res)
        self.assertTrue(guc_res, '执行失败' + text)
        guc_res = self.commsh_sub_sta.execute_gsguc(
            'reload', self.constant.GSGUC_SUCCESS_MSG, '',
            'all', False, False, macro.DB_INSTANCE_PATH_REMOTE1,
            f'host    replication  {self.pri_userdb_pub.db_user} '
            f'{self.pri_userdb_pub.db_host}/32 sha256',
            macro.DB_ENV_PATH_REMOTE1)
        self.log.info(guc_res)
        self.assertTrue(guc_res, '执行失败' + text)
        guc_res = self.commsh_sub_sta.execute_gsguc(
            'reload', self.constant.GSGUC_SUCCESS_MSG, '',
            'all', False, False, macro.DB_INSTANCE_PATH_REMOTE1,
            f'host all  {self.pri_userdb_pub.db_user} '
            f'{self.pri_userdb_pub.db_host}/32 sha256',
            macro.DB_ENV_PATH_REMOTE1)
        self.log.info(guc_res)
        self.assertTrue(guc_res, '执行失败' + text)

    def test_pub_sub(self):
        text = "--step1:在两个集群创建表 expect:成功--"
        self.log.info(text)
        sql_cmd = f"drop table if exists {self.tb_name1};" \
            f"drop table if exists {self.tb_name2};" \
            f"create table {self.tb_name1}(id1 int primary key,id2 int,id3 " \
            f"int);" \
            f"create table {self.tb_name2}(id int primary key constraint " \
            f"id_nn not null,use_filename varchar2(20),filename " \
            f"varchar2(255),text varchar2(2000))" \
            f"partition by range(id)" \
            f"( " \
            f"      partition p1 values less than(30)," \
            f"      partition P2 values less than(60)," \
            f"      partition P3 values less than(90)," \
            f"      partition P4 values less than(MAXVALUE)" \
            f");"
        pub_result = self.commsh_pub.execut_db_sql(
            sql_cmd, sql_type=self.user_param_pub)
        self.log.info(pub_result)
        self.assertEqual(pub_result.count(self.constant.DROP_TABLE_SUCCESS),
                         2, '执行失败' + text)
        self.assertEqual(pub_result.count(self.constant.TABLE_CREATE_SUCCESS),
                         4, '执行失败' + text)
        sub_result = self.commsh_sub.execut_db_sql(
            sql_cmd, self.user_param_sub, None, macro.DB_ENV_PATH_REMOTE1)
        self.log.info(sub_result)
        self.assertEqual(sub_result.count(self.constant.DROP_TABLE_SUCCESS),
                         2, '执行失败' + text)
        self.assertEqual(sub_result.count(self.constant.TABLE_CREATE_SUCCESS),
                         4, '执行失败' + text)

        text = "--step2:创建发布订阅 expect:成功--"
        self.log.info(text)
        sql_cmd = f"drop publication if exists {self.pubname};" \
            f"create publication {self.pubname} for all tables;"
        pub_result = self.commsh_pub.execut_db_sql(
            sql_cmd, sql_type=self.user_param_pub)
        self.log.info(pub_result)
        self.assertIn(self.constant.drop_pub_succ_msg, pub_result,
                      '执行失败' + text)
        self.assertIn(self.constant.create_pub_succ_msg, pub_result,
                      '执行失败' + text)
        result = self.commsh_sub.execute_generate(
            macro.COMMON_PASSWD, env_path=macro.DB_ENV_PATH_REMOTE1)
        self.assertIn(self.constant.create_keycipher_success, result,
                      '执行失败' + text)
        sql_cmd = f"drop subscription if exists {self.subname};" \
            f"create subscription {self.subname} connection " \
            f"'host={self.pri_userdb_pub.db_host} " \
            f"port={self.port} user={self.pri_userdb_pub.db_user} " \
            f"dbname={self.pri_userdb_pub.db_name} " \
            f"password={self.pri_userdb_pub.ssh_password}' publication " \
            f"{self.pubname};"
        sub_result = self.commsh_sub.execut_db_sql(
            sql_cmd, self.user_param_sub, None, macro.DB_ENV_PATH_REMOTE1)
        self.log.info(sub_result)
        self.assertIn(self.constant.drop_sub_succ_msg, sub_result,
                      '执行失败' + text)
        self.assertIn(self.constant.create_sub_succ_msg, sub_result, '执行失败'
                      + text)

        text = "--step3:发布端插入数据 expect:成功--"
        self.log.info(text)
        sql_cmd = f"insert into {self.tb_name1} values(1, 1, 1),(2,2,2);" \
            f"insert into {self.tb_name2} values(1,'first','%一',''),(60," \
            f"'first','%二',''),(90,'first','%三',''); "
        pub_result = self.commsh_pub.execut_db_sql(
            sql_cmd, sql_type=self.user_param_pub)
        self.log.info(pub_result)
        self.assertEqual(pub_result.count(self.constant.INSERT_SUCCESS_MSG), 2,
                         '执行失败' + text)

        text = "--step4:订阅端查询数据是否更新 expect:数据更新--"
        self.log.info(text)
        sql_select = f"select pg_sleep(5.5);select * from {self.tb_name1};"
        sub_result = self.commsh_sub.execut_db_sql(
            sql_select, self.user_param_sub, None, macro.DB_ENV_PATH_REMOTE1)
        self.log.info(sub_result)
        self.assertIn("1 |   1 |   1", sub_result, '执行失败' + text)
        self.assertIn("2 |   2 |   2", sub_result, '执行失败' + text)
        sql_select = f"select pg_sleep(5.5);" \
            f"select * from {self.tb_name2};"
        sub_result = self.commsh_sub.execut_db_sql(
            sql_select, self.user_param_sub, None, macro.DB_ENV_PATH_REMOTE1)
        self.log.info(sub_result)
        self.assertIn("1 | first        | %一", sub_result, '执行失败' + text)
        self.assertIn("60 | first        | %二", sub_result, '执行失败' + text)
        self.assertIn("90 | first        | %三", sub_result, '执行失败' + text)

        text = "--step5:订阅端主备切换 expect:成功--"
        self.log.info(text)
        sub_result = self.commsh_sub_sta.execute_gsctl(
            'switchover', self.constant.SWITCH_SUCCESS_MSG,
            env_path=macro.DB_ENV_PATH_REMOTE1,
            dn_path=macro.DB_INSTANCE_PATH_REMOTE1)
        self.log.info(sub_result)
        self.assertTrue(sub_result, '执行失败' + text)
        sub_result = self.commsh_sub_sta.exec_refresh_conf(
            env_path=macro.DB_ENV_PATH_REMOTE1)
        self.log.info(sub_result)
        self.assertTrue(sub_result, '执行失败' + text)

        text = "--step6:发布端删除数据 expect:成功--"
        self.log.info(text)
        sql_cmd = f"delete from {self.tb_name1} where id1=1;" \
            f"delete from {self.tb_name2} where id=1;"
        pub_result = self.commsh_pub.execut_db_sql(
            sql_cmd, sql_type=self.user_param_pub)
        self.log.info(pub_result)
        self.assertEqual(pub_result.count(self.constant.DELETE_SUCCESS_MSG), 2,
                         '执行失败' + text)

        text = "--step7:订阅端查询数据是否更新 expect:数据更新--"
        self.log.info(text)
        result = self.commsh_sub_sta.execute_generate(
            macro.COMMON_PASSWD, env_path=macro.DB_ENV_PATH_REMOTE1)
        self.assertIn(self.constant.create_keycipher_success, result,
                      '执行失败' + text)
        sql_select = f"select pg_sleep(20);" \
            f"select * from {self.tb_name1} where id1=1;"
        sub_result = self.commsh_sub_sta.execut_db_sql(
            sql_select, self.user_param_sub, None, macro.DB_ENV_PATH_REMOTE1)
        self.log.info(sub_result)
        self.assertIn('(0 rows)', sub_result, '执行失败' + text)
        sql_select = f"select * from {self.tb_name2} where id=1;"
        sub_result = self.commsh_sub_sta.execut_db_sql(
            sql_select, self.user_param_sub, None, macro.DB_ENV_PATH_REMOTE1)
        self.log.info(sub_result)
        self.assertIn('(0 rows)', sub_result, '执行失败' + text)

        text = "--step8:订阅端恢复主备关系 expect:成功--"
        self.log.info(text)
        sub_result = self.commsh_sub.execute_gsctl(
            'switchover', self.constant.SWITCHOVER_SUCCESS_MSG,
            env_path=macro.DB_ENV_PATH_REMOTE1,
            dn_path=macro.DB_INSTANCE_PATH_REMOTE1)
        self.log.info(sub_result)
        self.assertTrue(sub_result, '执行失败' + text)
        sub_result = self.commsh_sub.exec_refresh_conf(
            env_path=macro.DB_ENV_PATH_REMOTE1)
        self.log.info(sub_result)
        self.assertTrue(sub_result, '执行失败' + text)

        text = "--step9:发布端修改数据 expect:成功--"
        self.log.info(text)
        sql_cmd = f"update {self.tb_name1} set id3=999;update " \
            f"{self.tb_name2} set id=999 where id=90;"
        pub_result = self.commsh_pub.execut_db_sql(
            sql_cmd, sql_type=self.user_param_pub)
        self.log.info(pub_result)
        self.assertIn(self.constant.UPDATE_SUCCESS_MSG, pub_result, '执行失败'
                      + text)

        text = "--step10:订阅端查询数据是否更新 expect:数据更新--"
        self.log.info(text)
        sql_select = f"select pg_sleep(20);select id3 from {self.tb_name1};"
        sub_result = self.commsh_sub.execut_db_sql(
            sql_select, self.user_param_sub, None, macro.DB_ENV_PATH_REMOTE1)
        self.log.info(sub_result)
        self.assertEqual(str(sub_result.splitlines()[-2]).strip(), '999',
                         '执行失败' + text)
        sql_select = f"select pg_sleep(5.5);select id from {self.tb_name2};"
        sub_result = self.commsh_sub.execut_db_sql(
            sql_select, self.user_param_sub, None, macro.DB_ENV_PATH_REMOTE1)
        self.log.info(sub_result)
        self.assertEqual(str(sub_result.splitlines()[-2]).strip(), '999',
                         '执行失败' + text)

    def tearDown(self):
        text = "--step11:清理环境 expect:成功--"
        self.log.info(text)
        sub_result1 = self.commsh_sub.execute_gsctl(
            'switchover', self.constant.SWITCH_SUCCESS_MSG,
            env_path=macro.DB_ENV_PATH_REMOTE1,
            dn_path=macro.DB_INSTANCE_PATH_REMOTE1)
        self.log.info(sub_result1)
        sub_result2 = self.commsh_sub.exec_refresh_conf(
            env_path=macro.DB_ENV_PATH_REMOTE1)
        self.log.info(sub_result2)
        sql_cmd = f"drop subscription if exists {self.subname};"
        drop_sub_result = self.commsh_sub.execut_db_sql(
            sql_cmd, self.user_param_sub, None, macro.DB_ENV_PATH_REMOTE1)
        self.log.info(drop_sub_result)
        sql_cmd = f"drop publication if exists {self.pubname};"
        drop_pub_result = self.commsh_pub.execut_db_sql(
            sql_cmd, sql_type=self.user_param_pub)
        self.log.info(drop_pub_result)
        sql_cmd = f"drop table if exists {self.tb_name1};" \
            f"drop table if exists {self.tb_name2};"
        result_sub = \
            self.commsh_sub.execut_db_sql(sql_cmd, self.user_param_sub, None,
                                          macro.DB_ENV_PATH_REMOTE1)
        self.log.info(result_sub)
        result_pub = self.commsh_pub.execut_db_sql(
            sql_cmd, sql_type=self.user_param_pub)
        self.log.info(result_pub)
        cmd = f"mv " \
            f"{os.path.join(self.parent_path_pub, 'pg_hba.conf')} " \
            f"{os.path.join(macro.DB_INSTANCE_PATH, 'pg_hba.conf')} "
        self.log.info(cmd)
        sh_result = self.pri_userdb_pub.sh(cmd).result()
        self.log.info(sh_result)
        cmd = f"mv " \
            f"{os.path.join(self.parent_path_sub, 'pg_hba.conf')} " \
            f"{os.path.join(macro.DB_INSTANCE_PATH_REMOTE1, 'pg_hba.conf')} "
        self.log.info(cmd)
        sh_result1 = self.pri_userdb_sub.sh(cmd).result()
        self.log.info(sh_result1)
        result1 = self.commsh_pub.execute_gsguc(
            'set', self.constant.GSGUC_SUCCESS_MSG,
            f'wal_level={self.wal_level}')
        self.log.info(result1)
        result2 = self.commsh_pub.execute_gsguc(
            'reload', self.constant.GSGUC_SUCCESS_MSG,
            f'enable_slot_log={self.enable_slot_log}')
        self.log.info(result2)
        pub_result = self.commsh_pub.restart_db_cluster(True)
        sub_result = self.commsh_sub.restart_db_cluster(
            True, macro.DB_ENV_PATH_REMOTE1)
        self.assertTrue(sub_result1, '执行失败' + text)
        self.assertTrue(sub_result2, '执行失败' + text)
        self.assertTrue(pub_result, '执行失败' + text)
        self.assertTrue(sub_result, '执行失败' + text)
        self.assertTrue(result1, '执行失败' + text)
        self.assertTrue(result2, '执行失败' + text)
        self.assertEqual("", sh_result, '执行失败' + text)
        self.assertEqual("", sh_result1, '执行失败' + text)
        self.assertIn(self.constant.drop_pub_succ_msg, drop_pub_result,
                      '执行失败' + text)
        self.assertIn(self.constant.drop_sub_succ_msg, drop_sub_result,
                      '执行失败' + text)
        self.assertEqual(result_sub.count(self.constant.DROP_TABLE_SUCCESS), 2,
                         '执行失败' + text)
        self.assertEqual(result_pub.count(self.constant.DROP_TABLE_SUCCESS), 2,
                         '执行失败' + text)
        self.log.info(f"-----{os.path.basename(__file__)} end-----")
